-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support large schema discovery #17394
Conversation
8c1a4dc
to
54eba7e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mfsiega-airbyte I gave this a first pass! I don't have a lot of hands-on experience with the Discover workflow, so I'm gonna tag in @davinchia to give this a review as well.
I think the approach looks good to me. Left a few suggestions around types, naming, etc.
For my own understanding, which parts of this were blocked on removing the catalog from the response of the connections list endpoint? I wanted to double check that blocker and make sure we're in the clear
airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml
Show resolved
Hide resolved
airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml
Outdated
Show resolved
Hide resolved
description: Connector version | ||
type: string | ||
configHash: | ||
description: Config hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise, examples are always nice for generic 'string' type properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't exactly sure what an example of a hash should look like :) but I did put in some extra explanation?
return discoverJobToOutput(response); | ||
} | ||
|
||
private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse<AirbyteCatalog> response) { | ||
private SourceDiscoverSchemaRead discoverJobToOutput(final SynchronousResponse<String> response) throws ConfigNotFoundException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that this method is actually fetching from the database, I think a rename is appropriate so that it doesn't sound like it's doing a simple in-memory conversion operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
@@ -257,16 +251,19 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final So | |||
final SourceConnection source = new SourceConnection() | |||
.withSourceDefinitionId(sourceCreate.getSourceDefinitionId()) | |||
.withConfiguration(partialConfig); | |||
final SynchronousResponse<AirbyteCatalog> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName); | |||
final SynchronousResponse<String> response = synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, sourceDef.getDockerImageTag()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It definitely feels like we've lost some readability with the change from AirbyteCatalog
to String
here. If the ID were a UUID, it'd be pretty clear, ie SynchronousResponse<UUID>
but as a reader it feels like String
could be anything. If this is actually a UUID, maybe we can find a way to type it as such so these response types are clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a UUID, so we should be able to type it as such. I agree we should make clear this refers to a database id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attempted to make this a bit clearer (on top of the UUID change indeed).
.withDockerImage(dockerImage); | ||
.withDockerImage(dockerImage) | ||
.withSourceId(source.getSourceId().toString()) | ||
.withConfigHash(HASH_FUNCTION.hashBytes(Jsons.serialize(source.getConfiguration()).getBytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like the kind of thing that should maybe go into a helper or util class, and we can add some javadoc to explain why and how we're hashing the config. Otherwise I'd worry that this starts getting copy/pasted elsewhere and becomes hard to change if we ever need to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is a good call - Davin opened https://github.com/airbytehq/airbyte/issues/17488.
airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java
Show resolved
Hide resolved
airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml
Show resolved
Hide resolved
airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml
Outdated
Show resolved
Hide resolved
airbyte-config/config-models/src/main/resources/types/StandardDiscoverCatalogInput.yaml
Show resolved
Hide resolved
|
||
return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG).withDiscoverCatalog(catalog.get()); | ||
final UUID catalogId = | ||
configRepository.writeActorCatalogFetchEvent(catalog.get(), UUID.fromString(discoverSchemaInput.getSourceId()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be moved to an api call in the future. Can we quickly write up an issue so we don't forget?
@@ -38,6 +41,8 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl | |||
|
|||
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSynchronousSchedulerClient.class); | |||
|
|||
private static final HashFunction HASH_FUNCTION = Hashing.md5(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: there are now multiple instances of this floating about. We should centralise this. This can happen in a follow up PR. It definitely needs to happen otherwise we can bump heads against a tricky cache invalidation issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mfsiega-airbyte , very exciting!
Most of my comments are in the same vein as Parker's. One major comment about centralising the hashing function definition. It doesn't have to happen in this PR but should definitely happen relatively soon after to prevent any cache invalidation errors.
I'm asking Malik to clarify here the hashing differences here: https://airbytehq-team.slack.com/archives/C03A8CSANPQ/p1664572102698309
Will review again after you make changes.
NOTE
|
NOTE
|
@@ -73,7 +78,10 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig, | |||
final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration()); | |||
|
|||
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput() | |||
.withConnectionConfiguration(fullConfig); | |||
.withConnectionConfiguration(fullConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to version this in Temporal @pmossman ? Or is it ok to not do so because this is an activity impl and not a workflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mfsiega-airbyte this looks good to me.
One open question from me on versioning to make sure we don't cause errors. We can address the hashing in a follow up PR.
Lmk if there are parts you want to me to look at closer.
Re workflow versioning: once #17562 is merged, any schema discovery workflows that are underway during a deployment will fail and need to be retried (rather than getting stuck). Since this is relatively easy to recover from, planning to go ahead rather than specifically handling versioning (as soon as the linked PR is merged). |
NOTE
|
@mfsiega-airbyte integration tests for mysql-source are broken https://github.com/airbytehq/airbyte/actions/runs/3194916543/jobs/5215016534 |
@suhomud I can take a look but it might take some time, so if you have any more specific error/etc that points to that PR I can probably track it down more quickly. |
@mfsiega-airbyte looking as well. What I see as for now:
I assume not only mysql is affected |
Yeah indeed - these tests were never passing a I'll send a PR to fix shortly. |
@mfsiega-airbyte thanks for quick fix! I run integration test on the #17662 |
Looks like the tests passed and I merged, lmk if there are still issues! |
…vation * master: (26 commits) supply a source id for schema discovery in connector integration tests (#17662) Source Iterable: Add permission check for stream (#17602) Moving TrackingClientSingleton.initialize into the bean itself (#17631) Handle null workspace IDs in tracking/reporting methods gracefully (#17641) Bump Airbyte version from 0.40.11 to 0.40.12 (#17653) Revert "Do not wait the end of a reset to return an update (#17591)" (#17640) Standardize HttpRequester's url_base and path format (#17524) Create geography_type enum and add geography column in connection and workspace table (#16818) airbyte-cron: update connector definitions from remote (#16438) Do not wait the end of a reset to return an update (#17591) Remove redundant title labels from connector specs (#17544) Updated GA4 status support large schema discovery (#17394) 🪟 🐛 Fixes connector checks not properly ending their loading state (#17620) 🪟🧪 [Experiment] add hideOnboarding experiment (#17605) Source Recharge: change releaseStage to GA (#17606) Source Recharge: skip stream if 403 received (#17608) remove sonar-scan workflow (#17609) Mark/tables should be full width on all pages (#17401) Auto fail all workfow if there is a Versioning issue (#17562) ...
* support large schema discovery * update generic source tests to handle new approach to schema discovery * readability improvements related to schema discovery and large schema support * update internal ScheduleHandler method name * update source tests per new schema discovery interface
What
Support large catalog (aka schema) discovery.
How
Today, the
discover schema
job passes the discovered schema back as a returned value. Then the API handler persists the schema and returns it (code ref: https://github.com/airbytehq/airbyte/blob/master/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java#L210). For large schemas, this runs into a Temporal limit which won't pass such a large message.Instead, we will persist the schema in the
discover schema
job and just pass back the id. Then the handler will use the id to retrieve it out of the db and return it.Recommended reading order
airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml
, modifies the output interface from the discover schema job.airbyte-config/config-models/src/main/resources/types/JobDiscoverCatalogConfig.yaml
,airbyte-config/config-models/src/main/resources/types/StandardDiscoverCatalogInput.yaml
modifies the input interface to the discover schema job, including some bits of info that are only necessary to persist alongside the catalog. This is probably not ideal, but could perhaps be cleaned up after the fix is merged.airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java
for changes in the API handlerairbyte-workers/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java
where we actually persist the catalogTests
Updated unit tests to exercise the new behavior.
For manual e2e testing, used
make-big-schema.sh
to make a table with 15k tables. Verified that this failed under the existing behaviour and succeeded with this PR.